在 SQL Server 2012 中保存流数据
本文关键字:保存 数据 2012 SQL Server | 更新日期: 2023-09-27 18:33:20
我正在尝试使用Twitter流式处理API从C#控制台应用程序将一些数据保存到SQL Server中。我尝试将数据保存在正常保存中,但没有奏效。任何人都可以建议我应该使用哪种方式来存储流数据吗?
这是我的 C# 代码。提前谢谢你
static void Main(string[] args)
{
TwitterCredentials.SetCredentials("xxxx,xxxx,xxxx,xxxx");
Stream_FilteredStreamExample();
}
private static void Stream_FilteredStreamExample()
{
SqlConnection conn = new SqlConnection(@"Data Source=xxxx;Initial Catalog=Surveillance;Integrated Security=True");
conn.Open();
var stream = Stream.CreateFilteredStream();
stream.AddTrack("ebola");
stream.MatchingTweetAndLocationReceived += (sender, args) =>
{
if (!args.Tweet.IsRetweet)
{
var tweet = args.Tweet;
if(args.Tweet.Coordinates!=null)
{
SqlCommand myCommand = new SqlCommand("INSERT INTO TwitterDatabase(Tweet,Latitude,Longitude) values('"+tweet.Text+"','"+tweet.Coordinates.Latitude+"','"+tweet.Coordinates.Longitude+"')",conn);
Console.WriteLine("tweets:{0}", tweet.Text);
Console.WriteLine("(Coardinates{0}, {1})", tweet.Coordinates.Latitude, tweet.Coordinates.Longitude);
}
}
};
stream.StartStreamMatchingAnyCondition();
conn.Close();
}
在数据库中,我有数据类型
Tweet = varchar(150)
Latitude = float
Longitude = float
当您尝试保存推文时,您的连接已关闭。打开连接,然后将推文保存在事件中并执行命令:
private static void Stream_FilteredStreamExample()
{
var stream = Stream.CreateFilteredStream();
stream.AddTrack("ebola");
stream.MatchingTweetAndLocationReceived += (sender, args) =>
{
if (!args.Tweet.IsRetweet)
{
using (SqlConnection conn = new SqlConnection(@"Data Source=xxxx;Initial Catalog=Surveillance;Integrated Security=True"))
{
conn.Open();
var tweet = args.Tweet;
if(args.Tweet.Coordinates!=null)
{
using (SqlCommand myCommand = new SqlCommand("INSERT INTO TwitterDatabase(Tweet,Latitude,Longitude) values '"+tweet.Text+"','"+tweet.Coordinates.Latitude+"','"+tweet.Coordinates.Longitude+"')",conn))
{
myCommand.ExecuteNonQuery();
}
}
}
}
};
stream.StartStreamMatchingAnyCondition();
}
您实际上必须查看参数化查询。您现在已接触到 sql 注入。
我尝试存储的数据是结构化数据,因此为了存储到数据库中,以下代码对我有用。
stream.MatchingTweetAndLocationReceived += (sender, args) =>
{
if (!args.Tweet.IsRetweet)
{
using ( SqlConnection conn = new SqlConnection(@"Data Source=WIN-PAL1Q8DR163'AVINASH;Initial Catalog=Surveillance;Integrated Security=True"))
{
conn.Open();
var tweet = args.Tweet;
if (args.Tweet.Coordinates != null)
{
try
{
using (SqlCommand myCommand = new SqlCommand("INSERT INTO TwitterDatabase(Tweet,Latitude,Longitude) VALUES(@Tweeets, @LatCoordinate, @LongCoordinate)", conn))
{
myCommand.Parameters.Add(new SqlParameter("Tweeets", tweet.Text));
myCommand.Parameters.Add(new SqlParameter("LatCoordinate", tweet.Coordinates.Latitude));
myCommand.Parameters.Add(new SqlParameter("LongCoordinate", tweet.Coordinates.Longitude));
myCommand.ExecuteNonQuery();
}
}catch{
Console.WriteLine("Could not insert.");
}
}
}
}
};
stream.StartStreamMatchingAnyCondition();
}
}
}